/*
 * Copyright (c) SiteWhere, LLC. All rights reserved. http://www.sitewhere.com
 *
 * The software in this package is published under the terms of the CPAL v1.0
 * license, a copy of which has been included with this distribution in the
 * LICENSE.txt file.
 */
package com.smartthing.device.communication.mqtt;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.fusesource.hawtdispatch.ShutdownException;
import org.fusesource.mqtt.client.FutureConnection;
import org.fusesource.mqtt.client.QoS;

import com.smartthing.spi.SmartThingException;
import com.smartthing.spi.device.IDeviceAssignment;
import com.smartthing.spi.device.IDeviceNestingContext;
import com.smartthing.spi.device.command.IDeviceCommandExecution;
import com.smartthing.spi.device.communication.ICommandDeliveryProvider;
import com.smartthing.spi.server.lifecycle.ILifecycleProgressMonitor;
import com.smartthing.spi.server.lifecycle.LifecycleComponentType;

/**
 * Implementation of {@link ICommandDeliveryProvider} that publishes commands to
 * an MQTT topic so that they can be processed asynchronously by a device
 * listening on the topic.
 * 
 * @author Derek
 */
public class MqttCommandDeliveryProvider extends MqttLifecycleComponent
	implements ICommandDeliveryProvider<byte[], MqttParameters> {

    /** Static logger instance */
    private static Logger LOGGER = LogManager.getLogger();

    /** Shared MQTT connection */
    private FutureConnection connection;

    public MqttCommandDeliveryProvider() {
	super(LifecycleComponentType.CommandDeliveryProvider);
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * com.smartthing.device.communication.mqtt.MqttLifecycleComponent#start(com.
     * sitewhere.spi.server.lifecycle.ILifecycleProgressMonitor)
     */
    @Override
    public void start(ILifecycleProgressMonitor monitor) throws SmartThingException {
	super.start(monitor);

	LOGGER.info("Connecting to MQTT broker at '" + getHostname() + ":" + getPort() + "'...");
	connection = getConnection();
	LOGGER.info("Connected to MQTT broker.");
    }

    /*
     * (non-Javadoc)
     * 
     * @see com.smartthing.spi.server.lifecycle.ILifecycleComponent#getLogger()
     */
    @Override
    public Logger getLogger() {
	return LOGGER;
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * com.smartthing.device.communication.mqtt.MqttLifecycleComponent#stop(com.
     * sitewhere.spi.server.lifecycle.ILifecycleProgressMonitor)
     */
    @Override
    public void stop(ILifecycleProgressMonitor monitor) throws SmartThingException {
	if (connection != null) {
	    try {
		connection.disconnect().await();
		connection.kill().await();
	    } catch (ShutdownException e) {
		LOGGER.info("Dispatcher has already been shut down.");
	    } catch (Exception e) {
		LOGGER.error("Error shutting down MQTT device event receiver.", e);
	    }
	}
	super.stop(monitor);
    }

    /*
     * (non-Javadoc)
     * 
     * @see
     * com.smartthing.spi.device.communication.ICommandDeliveryProvider#deliver(
     * com. sitewhere .spi.device.IDeviceNestingContext,
     * com.smartthing.spi.device.IDeviceAssignment,
     * com.smartthing.spi.device.command.IDeviceCommandExecution,
     * java.lang.Object, java.lang.Object)
     */
    @Override
    public void deliver(IDeviceNestingContext nested, IDeviceAssignment assignment, IDeviceCommandExecution execution,
	    byte[] encoded, MqttParameters params) throws SmartThingException {
	try {
	    LOGGER.debug("About to publish command message to topic: " + params.getCommandTopic());
	    connection.publish(params.getCommandTopic(), encoded, QoS.AT_LEAST_ONCE, false);
	    LOGGER.debug("Command published.");
	} catch (Exception e) {
	    throw new SmartThingException("Unable to publish command to MQTT topic.", e);
	}
    }

    /*
     * (non-Javadoc)
     * 
     * @see com.smartthing.spi.device.communication.ICommandDeliveryProvider#
     * deliverSystemCommand (com.smartthing.spi.device.IDeviceNestingContext,
     * com.smartthing.spi.device.IDeviceAssignment, java.lang.Object,
     * java.lang.Object)
     */
    @Override
    public void deliverSystemCommand(IDeviceNestingContext nested, IDeviceAssignment assignment, byte[] encoded,
	    MqttParameters params) throws SmartThingException {
	try {
	    LOGGER.debug("About to publish system message to topic: " + params.getSystemTopic());
	    connection.publish(params.getSystemTopic(), encoded, QoS.AT_LEAST_ONCE, false);
	    LOGGER.debug("Command published.");
	} catch (Exception e) {
	    throw new SmartThingException("Unable to publish command to MQTT topic.", e);
	}
    }
}